Use cache instead of artifacts

This commit is contained in:
Sergey Dolin
2023-06-23 23:13:39 +02:00
parent c7d43763bf
commit ab422d01a2
76 changed files with 5960 additions and 5586 deletions

View File

@@ -0,0 +1,65 @@
import {createHttpClient, getCacheApiUrl} from './http-client';
import {retryTypedResponse} from './retry';
import {isSuccessStatusCode} from './http-responses';
import {HttpClient} from '@actions/http-client';
import {downloadCacheHttpClient} from '@actions/cache/lib/internal/downloadUtils';
import * as core from '@actions/core';
interface ArtifactCacheEntry {
cacheKey?: string;
scope?: string;
cacheVersion?: string;
creationTime?: string;
archiveLocation?: string;
}
const getCacheArchiveUrl = async (
httpClient: HttpClient,
cacheKey: string,
cacheVersion: string
): Promise<string | null> => {
// TODO: should work with delete?
const resource = `cache?keys=${cacheKey}&version=${cacheVersion}`;
const response = await retryTypedResponse('getCacheEntry', async () =>
httpClient.getJson<ArtifactCacheEntry>(getCacheApiUrl(resource))
);
// Cache not found
if (response.statusCode === 204) {
core.debug(
`There's no cache with key ${cacheKey} & version=${cacheVersion}`
);
// List cache for primary key only if cache miss occurs
return null;
}
if (!isSuccessStatusCode(response.statusCode)) {
throw new Error(`Cache service responded with ${response.statusCode}`);
}
const cacheResult = response.result;
core.debug(`getCacheEntry response is:\n${JSON.stringify(cacheResult)}`);
const cacheDownloadUrl = cacheResult?.archiveLocation;
if (!cacheDownloadUrl) {
// Cache archiveLocation not found. This should never happen, and hence bail out.
throw new Error('Cache not found.');
}
return cacheDownloadUrl;
};
export const downloadFileFromActionCache = async (
destFileName: string,
cacheKey: string,
cacheVersion: string
) => {
const httpClient = createHttpClient();
const archiveUrl = await getCacheArchiveUrl(
httpClient,
cacheKey,
cacheVersion
);
if (!archiveUrl) {
return undefined;
}
await downloadCacheHttpClient(archiveUrl, destFileName);
};

View File

@@ -0,0 +1,44 @@
import {HttpClient} from '@actions/http-client';
import {BearerCredentialHandler} from '@actions/http-client/lib/auth';
import {RequestOptions} from '@actions/http-client/lib/interfaces';
import * as core from '@actions/core';
const createAcceptHeader = (type: string, apiVersion: string): string =>
`${type};api-version=${apiVersion}`;
const getRequestOptions = (): RequestOptions => ({
headers: {
Accept: createAcceptHeader('application/json', '6.0-preview.1')
}
});
export const createHttpClient = (): HttpClient => {
const token = process.env['ACTIONS_RUNTIME_TOKEN'] || '';
const bearerCredentialHandler = new BearerCredentialHandler(token);
return new HttpClient(
'actions/cache',
[bearerCredentialHandler],
getRequestOptions()
);
};
export const getGitHubActionsApiUrl = (resource: string): string => {
const baseUrl: string = process.env['GITHUB_API_URL'] || '';
if (!baseUrl) {
throw new Error('GitHub API Url not found, unable to restore cache.');
}
const repo = process.env['GITHUB_REPOSITORY'];
const url = `${baseUrl}/repos/${repo}/actions/${resource}`;
core.debug(`Resource Url: ${url}`);
return url;
};
export const getCacheApiUrl = (resource: string): string => {
const baseUrl: string = process.env['ACTIONS_CACHE_URL'] || '';
if (!baseUrl) {
throw new Error('Cache Service Url not found, unable to restore cache.');
}
const url = `${baseUrl}_apis/artifactcache/${resource}`;
core.debug(`Resource Url: ${url}`);
return url;
};

View File

@@ -0,0 +1,19 @@
import {TypedResponse} from '@actions/http-client/lib/interfaces';
import {HttpClientError} from '@actions/http-client';
export const isSuccessStatusCode = (statusCode?: number): boolean => {
if (!statusCode) {
return false;
}
return statusCode >= 200 && statusCode < 300;
};
export function isServerErrorStatusCode(statusCode?: number): boolean {
if (!statusCode) {
return true;
}
return statusCode >= 500;
}
export interface TypedResponseWithError<T> extends TypedResponse<T> {
error?: HttpClientError;
}

View File

@@ -0,0 +1,127 @@
import {
HttpClientError,
HttpClientResponse,
HttpCodes
} from '@actions/http-client';
import {
isServerErrorStatusCode,
TypedResponseWithError
} from './http-responses';
import * as core from '@actions/core';
const isRetryableStatusCode = (statusCode?: number): boolean => {
if (!statusCode) {
return false;
}
const retryableStatusCodes = [
HttpCodes.BadGateway,
HttpCodes.ServiceUnavailable,
HttpCodes.GatewayTimeout
];
return retryableStatusCodes.includes(statusCode);
};
const sleep = (milliseconds: number): Promise<void> =>
new Promise(resolve => setTimeout(resolve, milliseconds));
// The default number of retry attempts.
const DefaultRetryAttempts = 2;
// The default delay in milliseconds between retry attempts.
const DefaultRetryDelay = 5000;
const retry = async <T>(
name: string,
method: () => Promise<T>,
getStatusCode: (arg0: T) => number | undefined,
maxAttempts = DefaultRetryAttempts,
delay = DefaultRetryDelay,
onError: ((arg0: Error) => T | undefined) | undefined = undefined
): Promise<T> => {
let errorMessage = '';
let attempt = 1;
while (attempt <= maxAttempts) {
let response: T | undefined = undefined;
let statusCode: number | undefined = undefined;
let isRetryable = false;
try {
response = await method();
} catch (error) {
if (onError) {
response = onError(error);
}
isRetryable = true;
errorMessage = error.message;
}
if (response) {
statusCode = getStatusCode(response);
if (!isServerErrorStatusCode(statusCode)) {
return response;
}
}
if (statusCode) {
isRetryable = isRetryableStatusCode(statusCode);
errorMessage = `Cache service responded with ${statusCode}`;
}
core.debug(
`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`
);
if (!isRetryable) {
core.debug(`${name} - Error is not retryable`);
break;
}
await sleep(delay);
attempt++;
}
throw Error(`${name} failed: ${errorMessage}`);
};
export const retryHttpClientResponse = async (
name: string,
method: () => Promise<HttpClientResponse>,
maxAttempts = DefaultRetryAttempts,
delay = DefaultRetryDelay
): Promise<HttpClientResponse> => {
return await retry(
name,
method,
(response: HttpClientResponse) => response.message.statusCode,
maxAttempts,
delay
);
};
export const retryTypedResponse = <T>(
name: string,
method: () => Promise<TypedResponseWithError<T>>,
maxAttempts = DefaultRetryAttempts,
delay = DefaultRetryDelay
): Promise<TypedResponseWithError<T>> =>
retry(
name,
method,
(response: TypedResponseWithError<T>) => response.statusCode,
maxAttempts,
delay,
// If the error object contains the statusCode property, extract it and return
// an TypedResponse<T> so it can be processed by the retry logic.
(error: Error) => {
if (error instanceof HttpClientError) {
return {
statusCode: error.statusCode,
result: null,
headers: {},
error
};
} else {
return undefined;
}
}
);

View File

@@ -0,0 +1,180 @@
import * as core from '@actions/core';
import fs from 'fs';
import {HttpClient} from '@actions/http-client';
import {TypedResponse} from '@actions/http-client/lib/interfaces';
import {ReserveCacheError, ValidationError} from '@actions/cache';
import {isSuccessStatusCode} from './http-responses';
import {retryHttpClientResponse, retryTypedResponse} from './retry';
import {getOctokit} from '@actions/github';
import {retry as octokitRetry} from '@octokit/plugin-retry';
import {createHttpClient, getCacheApiUrl} from './http-client';
const uploadFile = async (
httpClient: HttpClient,
cacheId: number,
filePath: string,
fileSize: number
): Promise<void> => {
if (fileSize <= 0) return;
const start = 0;
const end = fileSize - 1;
const contentRange = `bytes ${start}-${end}/*`;
core.debug(
`Uploading chunk of size ${
end - start + 1
} bytes at offset ${start} with content range: ${contentRange}`
);
const additionalHeaders = {
'Content-Type': 'application/octet-stream',
'Content-Range': contentRange
};
const resourceUrl = getCacheApiUrl(`caches/${cacheId.toString()}`);
const fd = fs.openSync(filePath, 'r');
try {
const uploadChunkResponse = await retryHttpClientResponse(
`uploadChunk (start: ${start}, end: ${end})`,
async () => {
const stream = fs
.createReadStream(filePath, {
fd,
start,
end,
autoClose: false
})
.on('error', error => {
throw new Error(
`Cache upload failed because file read failed with ${error.message}`
);
});
return httpClient.sendStream(
'PATCH',
resourceUrl,
stream,
additionalHeaders
);
}
);
if (!isSuccessStatusCode(uploadChunkResponse.message.statusCode)) {
throw new Error(
`Cache service responded with ${uploadChunkResponse.message.statusCode} during upload chunk.`
);
}
} finally {
fs.closeSync(fd);
}
return;
};
const resetCacheWithOctokit = async (cacheKey: string): Promise<void> => {
const token = core.getInput('repo-token');
const client = getOctokit(token, undefined, octokitRetry);
// TODO: better way to get repository?
const repo = process.env['GITHUB_REPOSITORY'];
core.debug(`remove cache "${cacheKey}"`);
try {
// TODO: replace with client.rest.
await client.request(
`DELETE /repos/${repo}/actions/caches?key=${cacheKey}`
);
} catch (error) {
if (error.status) {
core.debug(`Cache ${cacheKey} does not exist`);
} else {
throw error;
}
}
};
const reserveCache = async (
httpClient: HttpClient,
fileSize: number,
cacheKey: string,
cacheVersion: string
): Promise<number> => {
const reserveCacheRequest = {
key: cacheKey,
version: cacheVersion,
cacheSize: fileSize
};
const response = await retryTypedResponse('reserveCache', async () =>
httpClient.postJson<{cacheId: number}>(
getCacheApiUrl('caches'),
reserveCacheRequest
)
);
// handle 400 in the special way
if (response?.statusCode === 400)
throw new Error(
response?.error?.message ??
`Cache size of ~${Math.round(
fileSize / (1024 * 1024)
)} MB (${fileSize} B) is over the data cap limit, not saving cache.`
);
const cacheId = response?.result?.cacheId;
if (cacheId === undefined)
throw new ReserveCacheError(
`Unable to reserve cache with key ${cacheKey}, another job may be creating this cache. More details: ${response?.error?.message}`
);
return cacheId;
};
const commitCache = async (
httpClient: HttpClient,
cacheId: number,
filesize: number
): Promise<void> => {
const response = (await retryTypedResponse('commitCache', async () =>
httpClient.postJson<null>(getCacheApiUrl(`caches/${cacheId.toString()}`), {
size: filesize
})
)) as TypedResponse<null>;
if (!isSuccessStatusCode(response.statusCode)) {
throw new Error(
`Cache service responded with ${response.statusCode} during commit cache.`
);
}
};
export const uploadFileToActionsCache = async (
filePath: string,
cacheKey: string,
cacheVersion: string
) => {
try {
await resetCacheWithOctokit(cacheKey);
const fileSize = fs.statSync(filePath).size;
if (fileSize === 0) {
core.info(`the cache ${cacheKey} will be removed`);
return;
}
const httpClient = createHttpClient();
const cacheId = await reserveCache(
httpClient,
fileSize,
cacheKey,
cacheVersion
);
await uploadFile(httpClient, cacheId, filePath, fileSize);
await commitCache(httpClient, cacheId, fileSize);
} catch (error) {
const typedError = error as Error;
if (typedError.name === ValidationError.name) {
throw error;
} else if (typedError.name === ReserveCacheError.name) {
core.info(`Failed to save: ${typedError.message}`);
} else {
core.warning(`Failed to save: ${typedError.message}`);
}
}
};

View File

@@ -27,7 +27,7 @@ export class Issue implements IIssue {
constructor(
options: Readonly<IIssuesProcessorOptions>,
issue: Readonly<OctokitIssue> | Readonly<IIssue>
issue: Readonly<OctokitIssue | Readonly<IIssue>>
) {
this._options = options;
this.title = issue.title;

View File

@@ -26,7 +26,7 @@ import {Statistics} from './statistics';
import {LoggerService} from '../services/logger.service';
import {OctokitIssue} from '../interfaces/issue';
import {retry} from '@octokit/plugin-retry';
import {IState} from '../interfaces/state';
import {IState} from '../interfaces/state/state';
/***
* Handle processing of issues for staleness/closure.
@@ -146,6 +146,12 @@ export class IssuesProcessor {
}
const issueLogger: IssueLogger = new IssueLogger(issue);
if (this.state.isIssueProcessed(issue)) {
issueLogger.info(
' $$type skipped due being processed during the previous run'
);
continue;
}
await issueLogger.grouping(`$$type #${issue.number}`, async () => {
await this.processIssue(
issue,
@@ -154,6 +160,7 @@ export class IssuesProcessor {
labelsToRemoveWhenStale
);
});
this.state.addIssueToProcessed(issue);
}
if (!this.operations.hasRemainingOperations()) {
@@ -201,15 +208,6 @@ export class IssuesProcessor {
)}`
);
if (this.state.isIssueProcessed(issue)) {
issueLogger.info(
' $$type skipped due being processed during the previous run'
);
return;
}
this.state.addIssueToProcessed(issue);
// calculate string based messages for this issue
const staleMessage: string = issue.isPullRequest
? this.options.stalePrMessage
@@ -574,9 +572,11 @@ export class IssuesProcessor {
});
this.statistics?.incrementFetchedItemsCount(issueResult.data.length);
return issueResult.data.map(
(issue: Readonly<OctokitIssue>): Issue => new Issue(this.options, issue)
);
// state_reason is incompatible - oktokit dependency conflict?
// return issueResult.data.map((issue: Readonly<OctokitIssue>): Issue => {
return issueResult.data.map((issue): Issue => {
return new Issue(this.options, issue as Readonly<OctokitIssue>);
});
} catch (error) {
throw Error(`Getting issues was blocked by the error: ${error.message}`);
}

View File

@@ -1,115 +0,0 @@
import {Issue} from './issue';
import {IState} from '../interfaces/state';
import os from 'os';
import crypto from 'crypto';
import fs from 'fs';
import path from 'path';
// import * as artifact from '@actions/artifact';
import * as core from '@actions/core';
import {restoreCache, saveCache} from '@actions/cache';
type IssueID = number;
export class State implements IState {
private processedIssuesIDs: Set<IssueID>;
/**
* @private don't mutate in the debug mode
*/
private readonly debug: boolean;
constructor() {
this.processedIssuesIDs = new Set();
this.debug = core.getInput('debug-only') === 'true';
}
isIssueProcessed(issue: Issue) {
return this.processedIssuesIDs.has(issue.number);
}
addIssueToProcessed(issue: Issue) {
if (!this.debug) this.processedIssuesIDs.add(issue.number);
}
reset() {
if (!this.debug) this.processedIssuesIDs.clear();
}
private static readonly ARTIFACT_NAME = '_state';
private static readonly STATE_FILE = 'state.txt';
async persist() {
if (this.debug) {
core.debug('The state is not persisted in the debug mode');
return;
}
const serialized = Array.from(this.processedIssuesIDs).join('|');
const tmpDir = os.tmpdir();
const file = path.join(
tmpDir,
State.STATE_FILE
// crypto.randomBytes(8).readBigUInt64LE(0).toString()
);
fs.writeFileSync(file, serialized);
core.debug(
`Persisting state includes info about ${this.processedIssuesIDs.size} issue(s)`
);
// const artifactClient = artifact.create();
try {
saveCache([tmpDir], State.ARTIFACT_NAME);
// await artifactClient.uploadArtifact(State.ARTIFACT_NAME, [file], tmpDir);
} catch (error) {
core.warning(
`Persisting the state was not successful due to "${
error.message || 'unknown reason'
}"`
);
}
}
async rehydrate() {
this.reset();
const tmpDir = os.tmpdir();
// const artifactClient = artifact.create();
try {
/*
const downloadResponse = await artifactClient.downloadArtifact(
State.ARTIFACT_NAME,
tmpDir
);
*/
await restoreCache([tmpDir], State.ARTIFACT_NAME);
/*
const downloadedFiles = fs.readdirSync(downloadResponse.downloadPath);
if (downloadedFiles.length === 0) {
throw Error(
'There is no data in the state artifact, probably because of the previous run failed'
);
}
*/
const serialized = fs.readFileSync(
path.join(tmpDir, State.ARTIFACT_NAME),
{encoding: 'utf8'}
);
if (serialized.length === 0) return;
const issueIDs = serialized
.split('|')
.map(parseInt)
.filter(i => !isNaN(i));
this.processedIssuesIDs = new Set(issueIDs);
core.debug(
`Rehydrated state includes info about ${issueIDs.length} issue(s)`
);
} catch (error) {
core.warning(
`Rehydrating the state was not successful due to "${
error.message || 'unknown reason'
}"`
);
}
}
}

View File

@@ -0,0 +1,52 @@
import {IStateStorage} from '../../interfaces/state/state-storage';
import fs from 'fs';
import path from 'path';
import os from 'os';
import * as core from '@actions/core';
import {uploadFileToActionsCache} from '../actions-cache/upload';
import {downloadFileFromActionCache} from '../actions-cache/download';
const CACHE_KEY = '_state';
const CACHE_VERSION = '1';
const STATE_FILE = 'state.txt';
export class StateCacheStorage implements IStateStorage {
async save(serializedState: string): Promise<void> {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'state-'));
const file = path.join(tmpDir, STATE_FILE);
fs.writeFileSync(file, serializedState);
try {
await uploadFileToActionsCache(file, CACHE_KEY, CACHE_VERSION);
} catch (error) {
core.warning(
`Saving the state was not successful due to "${
error.message || 'unknown reason'
}"`
);
}
}
async restore(): Promise<string> {
const tmpDir = fs.mkdtempSync('state-');
const fileName = path.join(tmpDir, STATE_FILE);
try {
await downloadFileFromActionCache(fileName, CACHE_KEY, CACHE_VERSION);
if (!fs.existsSync(fileName)) {
core.info(
'There is no state persisted, probably because of the very first run or previous run failed'
);
return '';
}
return fs.readFileSync(path.join(tmpDir, STATE_FILE), {
encoding: 'utf8'
});
} catch (error) {
core.warning(
`Restoring the state was not successful due to "${
error.message || 'unknown reason'
}"`
);
return '';
}
}
}

View File

@@ -0,0 +1,71 @@
import {Issue} from '../issue';
import {IState} from '../../interfaces/state/state';
import * as core from '@actions/core';
import {IIssuesProcessorOptions} from '../../interfaces/issues-processor-options';
import {IStateStorage} from '../../interfaces/state/state-storage';
export type IssueID = number;
export class NoStateError extends Error {}
export class State implements IState {
/**
* @private don't mutate in the debug mode
*/
private readonly debug: boolean;
private readonly processedIssuesIDs: Set<IssueID>;
private readonly stateStorage: IStateStorage;
constructor(stateStorage: IStateStorage, options: IIssuesProcessorOptions) {
this.debug = options.debugOnly;
this.processedIssuesIDs = new Set();
this.stateStorage = stateStorage;
}
isIssueProcessed(issue: Issue) {
return this.processedIssuesIDs.has(issue.number);
}
addIssueToProcessed(issue: Issue) {
core.debug(`state: mark ${issue.number} as processed`);
if (!this.debug) this.processedIssuesIDs.add(issue.number);
}
reset() {
core.debug('state: reset');
if (!this.debug) this.processedIssuesIDs.clear();
}
private deserialize(serialized: string) {
const issueIDs = serialized
.split('|')
.map(id => parseInt(id))
.filter(i => !isNaN(i));
this.processedIssuesIDs.clear();
issueIDs.forEach(issueID => this.processedIssuesIDs.add(issueID));
}
private get serialized(): string {
return Array.from(this.processedIssuesIDs).join('|');
}
async persist(): Promise<void> {
if (this.debug) {
core.debug('The state is not persisted in the debug mode');
return;
}
core.info(
`state: persisting info about ${this.processedIssuesIDs.size} issue(s)`
);
return this.stateStorage.save(this.serialized);
}
async rehydrate(): Promise<void> {
this.reset();
const serialized = await this.stateStorage.restore();
this.deserialize(serialized);
core.info(
`state: rehydrated info about ${this.processedIssuesIDs.size} issue(s)`
);
}
}