客户端请求处理就是接收交互式/非交互式命令,通过httpclient发送到服务器(coordinator),客户端通过httpclient更新执行状态打印给用户看,最后收集结果打印给用户。启动Presto cli 客户端的代码文件为:
presto-cli\src\main\java\com\facebook\presto\cli\Presto.java
public final class Presto
{
private Presto() {}
public static void main(String[] args)
throws Exception
{
Console console = singleCommand(Console.class).parse(args);
if (console.helpOption.showHelpIfRequested()) {
return;
}
console.run();
}
}
Console类对应的源文件为:presto-cli\src\main\java\com\facebook\presto\cli\Console.java,在该类中对查询的处理方法为:
private static void process(QueryRunner queryRunner, String sql, OutputFormat outputFormat, boolean interactive)
{
try (Query query = queryRunner.startQuery(sql)) {
query.renderOutput(System.out, outputFormat, interactive);
// update session properties if present
if (!query.getSetSessionProperties().isEmpty() || !query.getResetSessionProperties().isEmpty()) {
Map<String, String> sessionProperties = new HashMap<>(queryRunner.getSession().getProperties());
sessionProperties.putAll(query.getSetSessionProperties());
sessionProperties.keySet().removeAll(query.getResetSessionProperties());
queryRunner.setSession(withProperties(queryRunner.getSession(), sessionProperties));
}
}
catch (RuntimeException e) {
System.out.println("Error running command: " + e.getMessage());
if (queryRunner.getSession().isDebug()) {
e.printStackTrace();
}
}
}
Query.java StatementCliet.java Console.java QueryRunner.java
客户端初始化:console.run()
@Override
public void run()
{
//将登录的配置参数(--catalog –server –schema 日志级别等)装载到ClientSession,附录1:客户端默认session:
ClientSession session = clientOptions.toClientSession();
//命令行查询 --execute
boolean hasQuery = !Strings.isNullOrEmpty(clientOptions.execute);
//--file 执行的文件
boolean isFromFile = !Strings.isNullOrEmpty(clientOptions.file);
if (!hasQuery || !isFromFile) {
AnsiConsole.systemInstall();
}
initializeLogging(session.isDebug());
String query = clientOptions.execute;
if (hasQuery) {
query += ";";
}
//冲突检查,--execute和—file不能同时使用
if (isFromFile) {
if (hasQuery) {
throw new RuntimeException("both --execute and --file specified");
}
try {
query = Files.toString(new File(clientOptions.file), Charsets.UTF_8);
hasQuery = true;
}
catch (IOException e) {
throw new RuntimeException(format("Error reading from file %s: %s", clientOptions.file, e.getMessage()));
}
}
//有查询就执行查询,没查询就运行客户端
try (QueryRunner queryRunner = QueryRunner.create(session)) {
if (hasQuery) {
executeCommand(queryRunner, query, clientOptions.outputFormat);
}
else {
runConsole(queryRunner, session);
}
}
}
执行命令行:com.facebook.presto.cli.Console.runConsole(QueryRunner queryRunner, ClientSession session)
private static void runConsole(QueryRunner queryRunner, ClientSession session)
{
try (TableNameCompleter tableNameCompleter = new TableNameCompleter(queryRunner);
LineReader reader = new LineReader(getHistory(), tableNameCompleter)) {
tableNameCompleter.populateCache();
StringBuilder buffer = new StringBuilder();
while (true) {
// read a line of input from user
String prompt = PROMPT_NAME + ":" + session.getSchema();
if (buffer.length() > 0) {
prompt = Strings.repeat(" ", prompt.length() - 1) + "-";
}
String line = reader.readLine(prompt + "> ");
// add buffer to history and clear on user interrupt
if (reader.interrupted()) {
String partial = squeezeStatement(buffer.toString());
if (!partial.isEmpty()) {
reader.getHistory().add(partial);
}
buffer = new StringBuilder();
continue;
}
// exit on EOF
if (line == null) {
return;
}
// check for special commands if this is the first line
if (buffer.length() == 0) {
String command = line.trim();
if (command.endsWith(";")) {
command = command.substring(0, command.length() - 1).trim();
}
switch (command.toLowerCase()) {
case "exit":
case "quit":
return;
case "help":
System.out.println();
System.out.println(getHelpText());
continue;
}
}
// not a command, add line to buffer
buffer.append(line).append("\n");
// execute any complete statements
String sql = buffer.toString();
StatementSplitter splitter = new StatementSplitter(sql, ImmutableSet.of(";", "\\G"));
for (Statement split : splitter.getCompleteStatements()) {
System.out.printf("Execute query:" + split.statement());
Optional<Object> statement = getParsedStatement(split.statement());
if (statement.isPresent() && isSessionParameterChange(statement.get())) {
session = processSessionParameterChange(statement.get(), session);
queryRunner.setSession(session);
tableNameCompleter.populateCache();
}
else {
OutputFormat outputFormat = OutputFormat.ALIGNED;
if (split.terminator().equals("\\G")) {
outputFormat = OutputFormat.VERTICAL;
}
process(queryRunner, split.statement(), outputFormat, true);
}
reader.getHistory().add(squeezeStatement(split.statement()) + split.terminator());
}
// replace buffer with trailing partial statement
buffer = new StringBuilder();
String partial = splitter.getPartialStatement();
if (!partial.isEmpty()) {
buffer.append(partial).append('\n');
}
}
}
catch (IOException e) {
System.err.println("Readline error: " + e.getMessage());
}
}
发送请求到Coordinator:
在方法:com.facebook.presto.cli.QueryRunner.startInternalQuery(String query)中创建一个StatementClient对象,在调用StatementClient类的构造方法中发送请求到Coordinator中。
public StatementClient(HttpClient httpClient, JsonCodec<QueryResults> queryResultsCodec, ClientSession session, String query)
{
checkNotNull(httpClient, "httpClient is null");
checkNotNull(queryResultsCodec, "queryResultsCodec is null");
checkNotNull(session, "session is null");
checkNotNull(query, "query is null");
this.httpClient = httpClient;
this.responseHandler = createFullJsonResponseHandler(queryResultsCodec);
this.debug = session.isDebug();
this.timeZoneId = session.getTimeZoneId();
this.query = query;
Request request = buildQueryRequest(session, query);
//发送请求给Coordinator
currentResults.set(httpClient.execute(request, responseHandler).getValue());
}
打印查询执行过程和结果
public void renderOutput(PrintStream out, OutputFormat outputFormat, boolean interactive)
{
SignalHandler oldHandler = Signal.handle(SIGINT, new SignalHandler()
{
@Override
public void handle(Signal signal)
{
if (ignoreUserInterrupt.get() || client.isClosed()) {
return;
}
try {
if (!client.cancelLeafStage()) {
client.close();
}
}
catch (RuntimeException e) {
log.debug(e, "error canceling leaf stage");
client.close();
}
}
});
try {
renderQueryOutput(out, outputFormat, interactive);
}
finally {
Signal.handle(SIGINT, oldHandler);
}
}
private void renderQueryOutput(PrintStream out, OutputFormat outputFormat, boolean interactive)
{
StatusPrinter statusPrinter = null;
@SuppressWarnings("resource")
PrintStream errorChannel = interactive ? out : System.err;
if (interactive) {
//交互式打印,非交互式会等待输出结果
statusPrinter = new StatusPrinter(client, out);
statusPrinter.printInitialStatusUpdates();
}
else {
waitForData();
}
打印执行情况的方法
public void printInitialStatusUpdates()
{
long lastPrint = System.nanoTime();
try {
while (client.isValid()) {
try {
// exit status loop if there is there is pending output
if (client.current().getData() != null) {
return;
}
// update screen if enough time has passed
if (Duration.nanosSince(lastPrint).getValue(SECONDS) >= 0.5) {
console.repositionCursor();
printQueryInfo(client.current());
lastPrint = System.nanoTime();
}
// fetch next results (server will wait for a while if no data)
client.advance();
}
catch (RuntimeException e) {
log.debug(e, "error printing status");
}
}
}
finally {
console.resetScreen();
}
}
附录1:客户端默认session:
public class ClientOptions
{
@Option(name = "--server", title = "server", description = "Presto server location (default: localhost:8080)")
public String server = "localhost:8080";
@Option(name = "--user", title = "user", description = "Username")
public String user = System.getProperty("user.name");
@Option(name = "--source", title = "source", description = "Name of source making query")
public String source = "presto-cli";
@Option(name = "--catalog", title = "catalog", description = "Default catalog")
public String catalog = "default";
@Option(name = "--schema", title = "schema", description = "Default schema")
public String schema = "default";
@Option(name = {"-f", "--file"}, title = "file", description = "Execute statements from file and exit")
public String file;
@Option(name = "--debug", title = "debug", description = "Enable debug information")
public boolean debug;
@Option(name = "--execute", title = "execute", description = "Execute specified statements and exit")
public String execute;
@Option(name = "--output-format", title = "output-format", description = "Output format for batch mode (default: CSV)")
public OutputFormat outputFormat = OutputFormat.CSV;